末翼我們將實作兩個小project,來活用前面九翼的內容。
[Day27]與[Day28]為project ECC,目標為實作一個可連接EdgeDB Cloud
的EdgeDB cloud connection
,並完成一個streamlit app作為前端。
Project postman
,目標為研究傳遞decorator factory參數的各種可能方法。於七月底到八月初參加了strealit connections hackathon比賽(因為參加比賽就有送一件薄帽T...),使用EdgeDB
的Blocking API
實作。後來於八月底排到了EdgeDB Cloud
的使用權限,加上又新學了asyncio.TaskGroup
與Exception Groups
,於是便想趁著鐵人賽,使用EdgeDB
的AsyncIO API
來重新實作看看。
EdgeDBCloudConn
的class
,並實作__aenter__
與__aeit__
,使該class
可以作為async context manager
使用。__aenter__
與__aexit__
時,logging
進入訊息。__aexit__
時,logging
實際database呼叫次數及實際在context managet
中所經歷的時間。query
,並於每次query
進行logging
。Read
的query
需有快取機制。transactions
功能。EdgeDB建基於Postgres
之上,有著自己的EdgeQL
語法與type system。其EdgeQL query
會於底層compile為相對應的Postgres query
,由於其不是一個ORM
,所以理論上所有想對Postgres
做的操作,應該都能使用更簡潔的EdgeQL
語法達成。
Co-Founder兼CEO的Yury Selivanov是Python asyncio背後的主要推手,也是asyncio威力加強版uvloop的主要開發者。因此不難想像EdgeDB
從一開始就以async思維開發,故其效率極高。下圖為官方的benchmark。
EdgeDB
各語言的library都在活躍開發中,目前已經支援的有:
其雲端服務EdgeDB Cloud也已進入了Beta版,目前沒有對外開放,但可以加入watchlist或是到discord申請快速通關。
EdgeDB-Python為官方提供的library。由於我們的是目標是建立async的connection,所以直接查找說明文件中AsyncIO API。
經過一番查找之後,發現edgedb.create_async_client
可以幫忙生成AsyncIOClient
的instance
,而AsyncIOClient
提供六種不同情況的query function
:
這幾個功能將是我們建立EdgeDBCloudConn
的好幫手。
ECC
架構如下:
ECC
├── ecc
│ ├── __init__.py
│ ├── connection.py
│ ├── data_structures.py
│ ├── queries.py
│ └── utils.py
│── edgedbcloud.toml
└── tests
├── __init__.py
├── test_healthy.py
├── test_imqry.py
├── tests_mqry.py
├── tests_qry_by_args.py
└── utils.py
ecc/connection.py
:EdgeDBCloudConn
class
。ecc/data_structures.py
:Enum
及Namedtuple
等資料結構。ecc/queries.py
:提供寫好的EdgeQL
。ecc/utils.py
:小工具。內有兩個Enum
及一個NamedTuple
。
RespJson(Enum)
內有NO
及YES
兩個member,並使用enum.auto
為其自動賦值。其功用是用來區別query
是否需要返回json
格式,會於QueryRecord
中使用。
from enum import Enum, auto
class RespJson(Enum):
NO = auto()
YES = auto()
RespConstraint(Enum)
內有FREE
、NO_MORE_THAN_ONE
及EXACTLY_ONE
三個member,並使用enum.auto
為其自動賦值。其功用是用來區別是否需要檢驗query
返回結果的長度,會於QueryRecord
中使用。
...
class RespConstraint(Enum):
FREE = auto()
NO_MORE_THAN_ONE = auto()
EXACTLY_ONE = auto()
QueryRecord(NamedTuple)
qry
(str
):EdgeQL
語法的query
str
。extra_args
(tuple
):當需要Filter
時使用。jsonify
(RespJson
):返回結果是否為json
格式。required_single
(RespConstraint
):是否檢驗返回結果的長度。extra_kwargs
(dict
):當需要Filter
時使用。task_name
(str
):asyncio task
的task name
。...
from typing import NamedTuple
class QueryRecord(NamedTuple):
qry: str
extra_args: tuple[Any, ...]
jsonify: RespJson
required_single: RespConstraint
extra_kwargs: dict[str, Any]
task_name: str
get_logger
為一個輔助function
來幫助我們取得logger instance
。由於logging.getLogger是一個module-level function
,只要名字不變的話,每次呼叫都可以取回同一個instance
,所以不用顯性以參數在各個obj
中傳遞。
def get_logger(logger_name: str = 'edgedb-cloud') -> logging.Logger:
return logging.getLogger(logger_name)
load_toml
建立edgedbcloud.toml
作為設定檔,並於其中定義一個edgedb-cloud
table
,輸入所需的參數。
host
需要由EdgeDB cli登入EdgeDB cloud
後,才能於command line
或REPL
中取得。EdgeDB
預設使用port
5656
。secret_key
可以由指令取得,也可以由EdgeDB Cloud
的UI取得。database
為_example
。其為EdgeDB
提供練習用的database
,可於EdgeDB Cloud
中一鍵生成。ttl
為immutable query
的快取時間。[edgedb-cloud]
host = 'xxx.aws.edgedb.cloud'
port = 5656
secret_key = 'secret_key'
database = '_example'
ttl = 5
load_toml
接受toml_name
與table_name
兩個參數,並回傳一個dict
,其內是toml_name
中table_name
的所有參數。Python於3.11加入了tomllib模組,幫助我們讀取toml
格式的檔案。請注意文件中特別指出tomllib.load
需接受readable
的binary object
,所以於open
的mode
需指定為rb
。
def load_toml(toml_name: str = 'edgedbcloud.toml',
table_name: str = 'edgedb-cloud') -> dict[str, Any]:
with open(toml_name, 'rb') as f:
data: dict[str, dict[str, Any]] = tomllib.load(f)
return data[table_name]
match_func_name
幫助我們選擇AsyncIOClient
query function
的小工具。
AsyncIOClient
的六種query function
可以分成兩個大類:
query
是否需要返回json
格式,可依function
名最後是否有_json
來判斷。function
名為query
開頭。function
名為query_single
開頭。function
名為query_required_single
開頭。於是我們開始思考,如何用jsonify
與required_single
兩個參數來組合出這六個function
呢?此外,又要用什麼來區別各種可能的值呢?神奇的12345
?還是singleton
的True
、False
、None
等?
Python的Enum
可能是一個不錯的解決方法,於是我們在data_structures.py
建立了RespJson
與RespConstraint
兩個Enum
。值得一提的是,因為我們只會比較Enum
member
的entity
,而不會比較其值,所以其值是多少並不重要,這也是為什麼會使用enum.auto
自動賦值的原因。
match_func_name
依靠structural pattern matching
的match enum的功能,來取得相對應的function
名。我們於match_func_name
的最後,有設定一個catch all
的case _
,並於其中使用Python3.11新增的assert_never
。
def match_func_name(jsonify: RespJson, required_single: RespConstraint) -> str:
match (jsonify, required_single):
case (RespJson.NO, RespConstraint.FREE):
func_name = 'query'
case (RespJson.NO, RespConstraint.NO_MORE_THAN_ONE):
func_name = 'query_single'
case (RespJson.NO, RespConstraint.EXACTLY_ONE):
func_name = 'query_required_single'
case (RespJson.YES, RespConstraint.FREE):
func_name = 'query_json'
case (RespJson.YES, RespConstraint.NO_MORE_THAN_ONE):
func_name = 'query_single_json'
case (RespJson.YES, RespConstraint.EXACTLY_ONE):
func_name = 'query_required_single_json'
case _ as unreachable:
assert_never(unreachable)
return func_name
pack_imqry_records
預先打包八種常用的immutable
query
為一list
,作為測試之用。每個query
型式都是一個QueryRecord
的instance
。
def pack_imqry_records() -> list[QueryRecord]:
qries = ['SELECT Movie {title};',
*['''SELECT assert_single(
(SELECT Movie {title, release_year}
FILTER .title=<str>$title and
.release_year=<int64>$release_year));''']*3,
'SELECT Account {username};',
*['''SELECT assert_single(
(SELECT Account {username}
FILTER .username=<str>$username))''']*3]
args_collector = [()]*8
jsons = [*[RespJson.NO]*4, *[RespJson.YES]*4]
required_singles = [RespConstraint.FREE,
*[RespConstraint.NO_MORE_THAN_ONE]*2,
RespConstraint.EXACTLY_ONE]*2
kwargs_collector = [{},
{'title': 'Ant-Man', 'release_year': 2015},
{'title': 'Ant-Man100', 'release_year': 2015},
{'title': 'Ant-Man', 'release_year': 2015},
{},
{'username': 'Alice'},
{'username': 'AliceCCC'},
{'username': 'Alice'}]
task_names = [*[f'QueryMovie{n}' for n in range(4)],
*[f'QueryAccount{n}' for n in range(4)]]
return [QueryRecord(*qr)
for qr in zip(qries,
args_collector,
jsons,
required_singles,
kwargs_collector,
task_names)]
pack_mqry_records
與pack_imqry_records
類似,只是包含的是兩種mutable
query
。
def pack_mqry_records() -> list[QueryRecord]:
qries = ['''WITH p := (INSERT Person {name:=<str>$name})
SELECT p {name};''',
'''WITH p:= (DELETE Person FILTER .name=<str>$name)
SELECT p {name};''']
args_collector = [()]*2
jsons = [RespJson.NO]*2
required_singles = [RespConstraint.FREE]*2
kwargs_collector = [{'name': 'Adam Gramham'}]*2
task_names = ['insert', 'delete']
return [QueryRecord(*qr)
for qr in zip(qries,
args_collector,
jsons, required_singles,
kwargs_collector,
task_names)]
pack_imqry_records_by_args
為測試是否能順利使用像$0
或$1
的語法進行query
, 目前僅包含一個immutable
query
。
def pack_imqry_records_by_args() -> list[QueryRecord]:
qries = ['''SELECT Movie {title, release_year}
FILTER .title=<str>$0 and .release_year=<int64>$1;''']
args_collector = [('Ant-Man', 2015)]
jsons = [RespJson.NO]
required_singles = [RespConstraint.FREE]
kwargs_collector: list[dict[str, Any]] = [{}]
task_names = ['QueryMovieTitleByArgs']
return [QueryRecord(*qr)
for qr in zip(qries,
args_collector,
jsons,
required_singles,
kwargs_collector,
task_names)]
其內只有EdgeDBCloudConn
class
。
當database
在一定時間內,通常不會變動的情況下,可以設定一個快取時間ttl
。在ttl
內如果使用同樣的query
與參數來讀取資料時,可以直接回傳快取結果,而不真正呼叫database
。
但是當database
頻繁變動的話,對這類query
進行快取就有很多眉角要注意。究竟使用者是真的想要快速發出多次同樣的mutable
query
?還是可能因為網路問題或retry
等邏輯沒寫好,不小心發送多次,而我們應該只呼叫一次database
就好?
因此我們決定EdgeDBCloudConn
預設ttl=0
,即沒有快取。當ttl>0
時,會使用alru_cache
來將_imquery
包上快取設定,而_mquery
則一律執行。
AbstractAsyncContextManager
contextlib.AbstractAsyncContextManager是Python提供的abstract base class
。由於EdgeDBCloudConn
class
將會實作__aenter__
與__aexit__
,所以在此繼承AbstractAsyncContextManager
是個絕佳的應用。
class EdgeDBCloudConn(AbstractAsyncContextManager):
...
__init__
共接收七個變數(註2
)。
host
、port
、database
與secret_key
將由load_toml
讀取edgedb.toml
所得,為實際建立連接所需要的參數。ttl
為設定的快取時間,預設為0
,即不快取。logger
為指定的logger instance
。當沒有指定的時候,會呼叫get_logger
來取得一個預設的logger
。log_level
為想要記錄的層級,當沒有指定的時候,設定為logging.INFO
。因為logger
及log_level
,這兩個變數所要傳遞的值比較明確,所以我們使用了or
的語法,而不顯性比較是否為None
。
此外:
self._client
將會是AsyncIOClient
建立的client
之變數名,先預設為None
。self._start
為計算進入__aenter__
與離開__aexit__
所用時間之用。self._dbcalls
為計算於__aenter__
與__aexit__
中實際呼叫database
的次數之用。self._total_dbcalls
為計算實際呼叫database
的總次數之用。最後一個if
是用來設定immutable query
的快取機制。這個手法相當微妙,這使得我們將會由self.__dict__
中來存取self._imquery
。現在的情況是:
self._imquery
為一般function
,只是non-data descriptor
而不是data descriptor
,所以當我們使用self._imquery = alru_cache(ttl=ttl)(self._imquery)
的語法時,相當於在self.__dict__
中,加入一個已經包過alru_cache(ttl=ttl)
的_imquery
。EdgeDBCloudConn.__dict__
中,還是保有原來沒加上alru_cache
的_imquery
。 def __init__(self,
*,
host: str,
port: int,
database: str,
secret_key: str,
ttl: float = 0,
logger: logging.Logger | None = None,
log_level: int | None = None) -> None:
self._host = host
self._port = port
self._database = database
self._secret_key = secret_key
self._logger = logger or get_logger()
self._log_level = log_level or logging.INFO
self._logger.setLevel(self._log_level)
self._client: EdgeDBAsyncClient | None = None
self._start = 0.0
self._dbcalls = 0
self._total_dbcalls = 0
if ttl > 0:
self._imquery = alru_cache(ttl=ttl)(self._imquery)
client
為一property
,用來包住底層的self._client
。由於我們希望只建立一個client
,所以每當self.client
被呼叫時,我們會先檢查self._client
是否為None
,如果是的話,表示我們還沒有建立client
,此時會先呼叫edgedb.create_async_client
並搭配由load_toml
所提供的各個參數來建立async-client
。由於在建立client
後,就不需要用到self._secret_key
,與其讓它待在instance
內,我們選擇刪除它,最後回傳self._client
。
class EdgeDBCloudConn(AbstractAsyncContextManager):
...
@property
def client(self) -> EdgeDBAsyncClient:
if self._client is None:
self._client = edgedb.create_async_client(host=self._host,
port=self._port,
database=self._database,
secret_key=self._secret_key)
del self._secret_key
return self._client
_get_client_qry_func
幫助我們實際由self.client
,取到名字為match_func_name
回傳值的function
。
class EdgeDBCloudConn(AbstractAsyncContextManager):
...
def _get_client_qry_func(self,
jsonify: RespJson,
required_single: RespConstraint) -> Callable[..., Any]:
return getattr(self.client, match_func_name(jsonify, required_single))
get_cur_timestamp
為回傳timestamp
的function
,可以幫助計算快取時間。get_cur_timestamp
雖與self
或cls
的狀態無關,卻是EdgeDBCloudConn
class
的好幫手,所以我們設計其為static method。
class EdgeDBCloudConn(AbstractAsyncContextManager):
...
@staticmethod
def get_cur_timestamp() -> float:
return datetime.now().timestamp()
_is_qry_immutable
是用來判斷我們的query
內是否含有可能會mutate
database的關鍵字。我們定義當query
內含有insert
、update
或 delete
時,就將此query
判定為會mutate
database
。由於_mutated_kws
不會隨著不同instance
而改變,所以我們將其設為class variable
。
class EdgeDBCloudConn(AbstractAsyncContextManager):
_mutated_kws = ('insert', 'update', 'delete')
...
def _is_qry_immutable(self, qry: str) -> bool:
return all(mutated_kw not in qry.casefold()
for mutated_kw in self._mutated_kws)
query
、_query
、_imquery
與_mquery
這四個都是async
function
。query
會依照_is_qry_immutable
判斷該將query
轉到_imquery
或是_mquery
。而_imquery
與_mquery
程式其實完全一樣,都是再將query
轉到_query
。這樣的用意是方便我們於__init__
中於ttl>0
時,可以動態加上alru_cache
至_imquery
。
class EdgeDBCloudConn(AbstractAsyncContextManager):
...
async def query(self,
qry: str,
*args: Any,
jsonify: RespJson = RespJson.NO,
required_single: RespConstraint = RespConstraint.FREE,
**kwargs: Any) -> QueryResult:
if self._is_qry_immutable(qry):
return await self._imquery(qry,
*args,
jsonify=jsonify,
required_single=required_single,
**kwargs)
return await self._mquery(qry,
*args,
jsonify=jsonify,
required_single=required_single,
**kwargs)
async def _imquery(self,
qry: str,
*args: Any,
jsonify: RespJson = RespJson.NO,
required_single: RespConstraint = RespConstraint.FREE,
**kwargs: Any) -> QueryResult:
return await self._query(qry,
*args,
jsonify=jsonify,
required_single=required_single,
**kwargs)
async def _mquery(self,
qry: str,
*args: Any,
jsonify: RespJson = RespJson.NO,
required_single: RespConstraint = RespConstraint.FREE,
**kwargs: Any) -> QueryResult:
return await self._query(qry,
*args,
jsonify=jsonify,
required_single=required_single,
**kwargs)
_query
中:
logging.info
記錄其實際被呼叫。self._dbcalls
加上1
。_get_client_qry_func
取回的function
,搭配qry
、*args
及**kwargs
進行呼叫,並回傳計算結果。class EdgeDBCloudConn(AbstractAsyncContextManager):
...
async def _query(self,
qry: str,
*args: Any,
jsonify: RespJson = RespJson.NO,
required_single: RespConstraint = RespConstraint.FREE,
**kwargs: Any) -> QueryResult:
self._logger.info(self._fmt_query_log_msg(
qry, args, jsonify, required_single, kwargs))
self._dbcalls += 1
qry_func = self._get_client_qry_func(jsonify, required_single)
return await qry_func(qry, *args, **kwargs)
__aenter__
logging.info
記錄。self.get_cur_timestamp()
將其值賦予self._start
。class EdgeDBCloudConn(AbstractAsyncContextManager):
...
async def __aenter__(self) -> Self:
self._logger.info(self._fmt_enter_aenter_log_msg())
self._start = self.get_cur_timestamp()
return self
__aexit__
await asyncio.sleep(1e-5)
的原因是,這樣可以方便UI可以即時更新。logging.info
,一次記錄已經進入__aexit__
,另一次記錄實際呼叫database
次數。exception
發生的話,以logging.error
記錄。self._dbcalls
加總至self._total_dbcalls
後,呼叫self._reset_db_calls
重設self._dbcalls
為0
。__aexit__
前,計算在with
中的時間,並以logging.info
記錄。__aexit__
,呼叫self._reset_start
重設self._start
為0.0
。class EdgeDBCloudConn(AbstractAsyncContextManager):
...
async def __aexit__(self,
exc_type: type[BaseException] | None,
exc_value: BaseException | None,
exc_tb: TracebackType | None) -> None:
await asyncio.sleep(1e-5)
self._logger.info(self._fmt_enter_aexit_log_msg())
self._logger.info(self._fmt_db_calls_log_msg())
if exc_type:
self._logger.error(self._fmt_aexit_exception_log_msg(exc_value))
self._total_dbcalls += self._dbcalls
self._reset_db_calls()
elapsed = self.get_cur_timestamp() - self._start
self._logger.info(self._fmt_exit_aexit_log_msg(elapsed))
self._reset_start()
aclose
可以顯式呼叫底層client
的self.client.aclose
。請注意我們在__aexit__
內,並沒有顯式的呼叫await self.aclose()
,原因是現在很多database都具有pooling的機制,包括EdgeDB。
async def aclose(self, timeout: float = 5) -> None:
print('aclose called')
await asyncio.wait_for(self.client.aclose(), timeout)
_healthy_check_url
與is_healthy
_healthy_check_url
是EdgeDB
內建能判斷database
現在狀態的url
(註3
)。
is_healthy
針對self._healthy_check_url
發出GET
request,並檢查返回status_code
是否為200
,來判斷database
是否健康。如果遭遇到任何錯誤,設定回傳False
。
class EdgeDBCloudConn(AbstractAsyncContextManager):
...
@property
def _healthy_check_url(self) -> str:
return f'https://{self._host}:{self._port}/server/status/alive'
@property
def is_healthy(self) -> bool:
"""https://www.edgedb.com/docs/guides/deployment/health_checks#health-checks"""
try:
return httpx.get(self._healthy_check_url,
follow_redirects=True,
verify=False,
timeout=30).status_code == 200
except httpx.HTTPError:
return False
_fmt_*
開頭的多個function
皆為返回str
,供logger
所用,在此不加贅述。
_reset_*
開頭的多個function
為重設時間或資料庫呼叫次數所用。
總共有四個TestClass
,都是實際對database
進行query
,沒有mocking
(註4
)。
TestHealthy
測試_healthy_check_url
及is_healthy
。TestImqryCachedConn
測試在少量及大量query
時都有進行快取。TestImqryNonCachedConn
測試其沒有快取機制。TestMqryConn
測試其沒有快取機制,且可正常insert
及delete
。註1:由於這個project的目標是建立與EdgeDB cloud
連結的connection
,而不是學習EdgeQL
的語法。如果是對其語法有興趣的朋友:
Rust
影片解說,此外他也是Learn Rust in a Month of Lunches的作者。註2:如果您想要的是一個可以連接local
或是host
在其它雲端的EdgeDB
connection
,需要考慮較複雜的建立方式,包括支援DSN。
註3:如果這是連接local
或是ip
的connection
,此healthy_check_url
網址可能使用http
而非https
。
註4:當在Windows上測試時,會出現ResourceWarning: Enable tracemalloc to get the object allocation traceback
,需要再研究資源關閉的問題。